/*
 * Copyright (c) Facebook, Inc. and its affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "velox/common/base/StatsReporter.h"
#include <folly/init/Init.h>
#include <gtest/gtest.h>
#include <cstdint>
#include <unordered_map>
#include "velox/common/base/Counters.h"
#include "velox/common/base/PeriodicStatsReporter.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/base/tests/StatsReporterUtils.h"
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/caching/CacheTTLController.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/memory/MmapAllocator.h"

namespace facebook::velox::test {

struct QuantileConfig {
  std::vector<StatType> statTypes;
  std::vector<double> percentiles;
  std::vector<size_t> slidingWindows;
};

inline QuantileConfig createStandardConfig() {
  return {{StatType::AVG, StatType::COUNT}, {0.5, 0.95}, {60, 600}};
}

class StatsReporterTest : public testing::Test {
 protected:
  void SetUp() override {
    // Set the registered flag to true so macros will work
    BaseStatsReporter::registered = true;
    reporter_ = std::dynamic_pointer_cast<TestReporter>(
        folly::Singleton<BaseStatsReporter>::try_get());
    reporter_->clear();
  }
  void TearDown() override {
    reporter_->clear();
    // Reset the registered flag
    BaseStatsReporter::registered = false;
  }

 public:
  std::shared_ptr<TestReporter> reporter_;

  void verifyQuantileRegistration(
      const std::string& key,
      const QuantileConfig& config) {
    EXPECT_TRUE(reporter_->quantileStatTypesMap.count(key));
    EXPECT_EQ(config.statTypes, reporter_->quantileStatTypesMap[key]);
    EXPECT_EQ(config.percentiles, reporter_->quantilePercentilesMap[key]);
    EXPECT_EQ(config.slidingWindows, reporter_->quantileSlidingWindowsMap[key]);
  }

  void verifyDynamicQuantileRegistration(
      const std::string& pattern,
      const QuantileConfig& config) {
    EXPECT_TRUE(reporter_->dynamicQuantileStatTypesMap.count(pattern));
    EXPECT_EQ(
        config.statTypes, reporter_->dynamicQuantileStatTypesMap[pattern]);
    EXPECT_EQ(
        config.percentiles, reporter_->dynamicQuantilePercentilesMap[pattern]);
    EXPECT_EQ(
        config.slidingWindows,
        reporter_->dynamicQuantileSlidingWindowsMap[pattern]);
  }

  template <typename KeyType>
  void registerAndVerifyQuantile(
      KeyType key,
      const QuantileConfig& config = createStandardConfig()) {
    reporter_->registerQuantileMetricExportType(
        key, config.statTypes, config.percentiles, config.slidingWindows);
    verifyQuantileRegistration(std::string(key), config);
  }

  template <typename KeyType>
  void registerAndVerifyDynamicQuantile(
      KeyType pattern,
      const QuantileConfig& config = createStandardConfig()) {
    reporter_->registerDynamicQuantileMetricExportType(
        pattern, config.statTypes, config.percentiles, config.slidingWindows);
    verifyDynamicQuantileRegistration(std::string(pattern), config);
  }
};

TEST_F(StatsReporterTest, trivialReporter) {
  DEFINE_METRIC("key1", StatType::COUNT);
  DEFINE_METRIC("key2", StatType::SUM);
  DEFINE_METRIC("key3", StatType::RATE);
  DEFINE_HISTOGRAM_METRIC("key4", 10, 0, 100, 50, 99, 100);

  EXPECT_EQ(StatType::COUNT, reporter_->statTypeMap["key1"]);
  EXPECT_EQ(StatType::SUM, reporter_->statTypeMap["key2"]);
  EXPECT_EQ(StatType::RATE, reporter_->statTypeMap["key3"]);
  std::vector<int32_t> expected = {50, 99, 100};
  EXPECT_EQ(expected, reporter_->histogramPercentilesMap["key4"]);
  EXPECT_TRUE(
      reporter_->statTypeMap.find("key5") == reporter_->statTypeMap.end());

  RECORD_METRIC_VALUE("key1", 10);
  RECORD_METRIC_VALUE("key1", 11);
  RECORD_METRIC_VALUE("key1", 15);
  RECORD_METRIC_VALUE("key2", 1001);
  RECORD_METRIC_VALUE("key2", 1200);
  RECORD_METRIC_VALUE("key3");
  RECORD_METRIC_VALUE("key3", 1100);
  RECORD_HISTOGRAM_METRIC_VALUE("key4", 50);
  RECORD_HISTOGRAM_METRIC_VALUE("key4", 100);

  EXPECT_EQ(36, reporter_->counterMap["key1"]);
  EXPECT_EQ(2201, reporter_->counterMap["key2"]);
  EXPECT_EQ(1101, reporter_->counterMap["key3"]);
  EXPECT_EQ(100, reporter_->counterMap["key4"]);

  EXPECT_EQ(
      "[key1:36,key2:2201,key3:1101,key4:100]", reporter_->fetchMetrics());
};

class PeriodicStatsReporterTest : public StatsReporterTest {};

class TestStatsReportMmapAllocator : public memory::MmapAllocator {
 public:
  TestStatsReportMmapAllocator(
      memory::MachinePageCount numMapped,
      memory::MachinePageCount numAllocated,
      memory::MachinePageCount numMallocBytes,
      memory::MachinePageCount numExternalMapped)
      : memory::MmapAllocator({.capacity = 1024}),
        numMapped_(numMapped),
        numAllocated_(numAllocated),
        numMallocBytes_(numMallocBytes),
        numExternalMapped_(numExternalMapped) {}

  memory::MachinePageCount numMapped() const override {
    return numMapped_;
  }

  memory::MachinePageCount numAllocated() const override {
    return numAllocated_;
  }

  uint64_t numMallocBytes() const {
    return numMallocBytes_;
  }

  memory::MachinePageCount numExternalMapped() const override {
    return numExternalMapped_;
  }

 private:
  memory::MachinePageCount numMapped_;
  memory::MachinePageCount numAllocated_;
  memory::MachinePageCount numMallocBytes_;
  memory::MachinePageCount numExternalMapped_;
};

class TestStatsReportAsyncDataCache : public cache::AsyncDataCache {
 public:
  TestStatsReportAsyncDataCache(cache::CacheStats stats)
      : cache::AsyncDataCache(nullptr, nullptr), stats_(stats) {}

  cache::CacheStats refreshStats() const override {
    std::lock_guard<std::mutex> l(mutex_);
    return stats_;
  }

  void updateStats(cache::CacheStats stats) {
    std::lock_guard<std::mutex> l(mutex_);
    stats_ = stats;
  }

 private:
  mutable std::mutex mutex_;
  cache::CacheStats stats_;
};

class TestStatsReportMemoryArbitrator : public memory::MemoryArbitrator {
 public:
  explicit TestStatsReportMemoryArbitrator(
      memory::MemoryArbitrator::Stats stats)
      : memory::MemoryArbitrator({}), stats_(stats) {}

  ~TestStatsReportMemoryArbitrator() override = default;

  void updateStats(memory::MemoryArbitrator::Stats stats) {
    std::lock_guard<std::mutex> l(mutex_);
    stats_ = stats;
  }

  std::string kind() const override {
    return "test";
  }

  void shutdown() override {}

  void addPool(const std::shared_ptr<memory::MemoryPool>& /*unused*/) override {
  }

  void removePool(memory::MemoryPool* /*unused*/) override {}

  void growCapacity(memory::MemoryPool* /*unused*/, uint64_t /*unused*/)
      override {
    VELOX_FAIL("Cannot grow capacity.");
  }

  uint64_t shrinkCapacity(memory::MemoryPool* /*unused*/, uint64_t /*unused*/)
      override {
    return 0;
  }

  uint64_t shrinkCapacity(uint64_t /*unused*/, bool /*unused*/, bool /*unused*/)
      override {
    return 0;
  }

  Stats stats() const override {
    std::lock_guard<std::mutex> l(mutex_);
    return stats_;
  }

  std::string toString() const override {
    return "TestStatsReportMemoryArbitrator::toString()";
  }

 private:
  mutable std::mutex mutex_;
  memory::MemoryArbitrator::Stats stats_;
};

class TestMemoryPool : public memory::MemoryPool {
 public:
  explicit TestMemoryPool() : MemoryPool("", Kind::kAggregate, nullptr, {}) {}

  void* allocate(int64_t size, std::optional<uint32_t> /* unused */) override {
    return nullptr;
  }

  void* allocateZeroFilled(int64_t /* unused */, int64_t /* unused */)
      override {
    return nullptr;
  }

  void* reallocate(
      void* /* unused */,
      int64_t /* unused */,
      int64_t /* unused */) override {
    return nullptr;
  }

  void free(void* /* unused */, int64_t /* unused */) override {}

  void allocateNonContiguous(
      memory::MachinePageCount /* unused */,
      memory::Allocation& /* unused */,
      memory::MachinePageCount /* unused */) override {}

  void freeNonContiguous(memory::Allocation& /* unused */) override {}

  memory::MachinePageCount largestSizeClass() const override {
    return 0;
  }

  const std::vector<memory::MachinePageCount>& sizeClasses() const override {
    static std::vector<memory::MachinePageCount> sizeClasses;
    return sizeClasses;
  }

  void allocateContiguous(
      memory::MachinePageCount /* unused */,
      memory::ContiguousAllocation& /* unused */,
      memory::MachinePageCount /* unused */) override {}

  void freeContiguous(memory::ContiguousAllocation& /* unused */) override {}

  void growContiguous(
      memory::MachinePageCount /* unused */,
      memory::ContiguousAllocation& /* unused */) override {}

  int64_t capacity() const override {
    return 0;
  }

  int64_t usedBytes() const override {
    return 0;
  }

  int64_t peakBytes() const override {
    return 0;
  }

  int64_t availableReservation() const override {
    return 0;
  }

  int64_t releasableReservation() const override {
    return 0;
  }

  int64_t reservedBytes() const override {
    return 0;
  }

  bool maybeReserve(uint64_t /* unused */) override {
    return false;
  }

  void release() override {}

  uint64_t freeBytes() const override {
    return 0;
  }

  uint64_t shrink(uint64_t /* unused */) override {
    return 0;
  }

  bool grow(uint64_t /* unused */, uint64_t /* unused */) override {
    return false;
  }

  void setReclaimer(
      std::unique_ptr<memory::MemoryReclaimer> /* unused */) override {}
  memory::MemoryReclaimer* reclaimer() const override {
    return nullptr;
  }

  void enterArbitration() override {}

  void leaveArbitration() noexcept override {}

  std::optional<uint64_t> reclaimableBytes() const override {
    return std::nullopt;
  }

  uint64_t reclaim(
      uint64_t /* unused */,
      uint64_t /* unused */,
      memory::MemoryReclaimer::Stats& /* unused */) override {
    return 0;
  }

  void abort(const std::exception_ptr& /* unused */) override {}

  bool aborted() const override {
    return false;
  }

  std::string toString(bool /* unused */) const override {
    return "";
  }

  std::string treeMemoryUsage(bool /* unused */) const override {
    return "";
  }

  std::shared_ptr<MemoryPool> genChild(
      std::shared_ptr<MemoryPool> /* unused */,
      const std::string& /* unused */,
      Kind /* unused */,
      bool /* unused */,
      const std::function<size_t(size_t)>& /* unused */,
      std::unique_ptr<memory::MemoryReclaimer> /* unused */) override {
    return nullptr;
  }

  Stats stats() const override {
    return Stats();
  }
};

TEST_F(PeriodicStatsReporterTest, basic) {
  TestStatsReportMmapAllocator allocator(1, 1, 1, 1);
  TestStatsReportAsyncDataCache cache(
      {.ssdStats = std::make_shared<cache::SsdCacheStats>()});
  cache::CacheTTLController::create(cache);
  TestStatsReportMemoryArbitrator arbitrator({});
  TestMemoryPool spillMemoryPool;
  PeriodicStatsReporter::Options options;
  options.cache = &cache;
  options.cacheStatsIntervalMs = 4'000;
  options.allocator = &allocator;
  options.allocatorStatsIntervalMs = 4'000;
  options.arbitrator = &arbitrator;
  options.arbitratorStatsIntervalMs = 4'000;
  options.spillMemoryPool = &spillMemoryPool;
  options.spillStatsIntervalMs = 4'000;
  PeriodicStatsReporter periodicReporter(options);

  periodicReporter.start();
  std::this_thread::sleep_for(std::chrono::milliseconds(2'000));

  // Check snapshot stats
  const auto& counterMap = reporter_->counterMap;
  {
    std::lock_guard<std::mutex> l(reporter_->m);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricArbitratorFreeCapacityBytes)), 1);
    ASSERT_EQ(
        counterMap.count(
            std::string(kMetricArbitratorFreeReservedCapacityBytes)),
        1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumTinyEntries)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumLargeEntries)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumEmptyEntries)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumSharedEntries)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumExclusiveEntries)),
        1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumPrefetchedEntries)),
        1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheTotalTinyBytes)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheTotalLargeBytes)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheTotalTinyPaddingBytes)),
        1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheTotalLargePaddingBytes)),
        1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheTotalPrefetchBytes)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheCachedEntries)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheCachedRegions)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheCachedBytes)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricCacheMaxAgeSecs)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryAllocatorMappedBytes)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryAllocatorAllocatedBytes)), 1);
    ASSERT_EQ(
        counterMap.count(
            std::string(kMetricMmapAllocatorDelegatedAllocatedBytes)),
        1);
    ASSERT_EQ(
        counterMap.count(
            std::string(kMetricMemoryAllocatorExternalMappedBytes)),
        1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSpillMemoryBytes)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSpillPeakMemoryBytes)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryAllocatorTotalUsedBytes)), 1);
    // Check deltas are not reported
    ASSERT_EQ(counterMap.count(std::string(kMetricMemoryCacheNumHits)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricMemoryCacheHitBytes)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricMemoryCacheNumNew)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricMemoryCacheNumEvicts)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumSavableEvicts)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumEvictChecks)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumWaitExclusive)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumAllocClocks)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumAgedOutEntries)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheSumEvictScore)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheReadEntries)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheReadBytes)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheWrittenEntries)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheWrittenBytes)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheOpenSsdErrors)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheOpenCheckpointErrors)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheOpenLogErrors)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheMetaFileDeleteErrors)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheGrowFileErrors)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheWriteSsdErrors)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheWriteSsdDropped)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheWriteCheckpointErrors)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheReadSsdErrors)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheReadCorruptions)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheReadCheckpointErrors)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheCheckpointsRead)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheCheckpointsWritten)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheRegionsEvicted)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheAgedOutEntries)), 0);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheAgedOutRegions)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheRecoveredEntries)), 0);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheReadWithoutChecksum)), 0);
    ASSERT_EQ(counterMap.size(), 24);
  }

  // Update stats
  auto newSsdStats = std::make_shared<cache::SsdCacheStats>();
  newSsdStats->entriesWritten = 10;
  newSsdStats->bytesWritten = 10;
  newSsdStats->checkpointsWritten = 10;
  newSsdStats->entriesRead = 10;
  newSsdStats->bytesRead = 10;
  newSsdStats->checkpointsRead = 10;
  newSsdStats->entriesAgedOut = 10;
  newSsdStats->regionsAgedOut = 10;
  newSsdStats->regionsEvicted = 10;
  newSsdStats->numPins = 10;
  newSsdStats->openFileErrors = 10;
  newSsdStats->openCheckpointErrors = 10;
  newSsdStats->openLogErrors = 10;
  newSsdStats->deleteMetaFileErrors = 10;
  newSsdStats->growFileErrors = 10;
  newSsdStats->writeSsdErrors = 10;
  newSsdStats->writeSsdDropped = 10;
  newSsdStats->writeCheckpointErrors = 10;
  newSsdStats->readSsdErrors = 10;
  newSsdStats->readSsdCorruptions = 10;
  newSsdStats->readCheckpointErrors = 10;
  newSsdStats->readWithoutChecksumChecks = 10;
  newSsdStats->entriesRecovered = 10;
  cache.updateStats(
      {.numHit = 10,
       .hitBytes = 10,
       .numNew = 10,
       .numEvict = 10,
       .numSavableEvict = 10,
       .numEvictChecks = 10,
       .numWaitExclusive = 10,
       .numAgedOut = 10,
       .allocClocks = 10,
       .sumEvictScore = 10,
       .ssdStats = newSsdStats});
  arbitrator.updateStats(
      memory::MemoryArbitrator::Stats(
          10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10));
  std::this_thread::sleep_for(std::chrono::milliseconds(4'000));

  // Stop right after sufficient wait to ensure the following reads from main
  // thread does not trigger TSAN failures.
  periodicReporter.stop();

  // Check delta stats are reported
  {
    std::lock_guard<std::mutex> l(reporter_->m);
    ASSERT_EQ(counterMap.count(std::string(kMetricMemoryCacheNumHits)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricMemoryCacheHitBytes)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricMemoryCacheNumNew)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricMemoryCacheNumEvicts)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumSavableEvicts)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumEvictChecks)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumWaitExclusive)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumAllocClocks)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheNumAgedOutEntries)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricMemoryCacheSumEvictScore)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheReadEntries)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheReadBytes)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheWrittenEntries)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheWrittenBytes)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheOpenSsdErrors)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheOpenCheckpointErrors)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheOpenLogErrors)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheMetaFileDeleteErrors)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheGrowFileErrors)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheWriteSsdErrors)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheWriteSsdDropped)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheWriteCheckpointErrors)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheReadSsdErrors)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheReadCorruptions)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheReadCheckpointErrors)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheCheckpointsRead)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheCheckpointsWritten)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheRegionsEvicted)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheAgedOutEntries)), 1);
    ASSERT_EQ(counterMap.count(std::string(kMetricSsdCacheAgedOutRegions)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheRecoveredEntries)), 1);
    ASSERT_EQ(
        counterMap.count(std::string(kMetricSsdCacheReadWithoutChecksum)), 1);
    ASSERT_EQ(counterMap.size(), 56);
  }
}

TEST_F(PeriodicStatsReporterTest, globalInstance) {
  TestStatsReportMemoryArbitrator arbitrator({});
  PeriodicStatsReporter::Options options;
  PeriodicStatsReporter periodicReporter(options);
  ASSERT_NO_THROW(periodicReporter.start());
  std::this_thread::sleep_for(std::chrono::milliseconds(4'000));
  ASSERT_NO_THROW(periodicReporter.stop());
}

TEST_F(PeriodicStatsReporterTest, allNullOption) {
  PeriodicStatsReporter::Options options;
  VELOX_ASSERT_THROW(
      stopPeriodicStatsReporter(), "No periodic stats reporter to stop.");
  ASSERT_NO_THROW(startPeriodicStatsReporter(options));
  VELOX_ASSERT_THROW(
      startPeriodicStatsReporter(options),
      "The periodic stats reporter has already started.");
  ASSERT_NO_THROW(stopPeriodicStatsReporter());
}

TEST_F(StatsReporterTest, registerQuantileMetricExportType) {
  QuantileConfig config = {
      {StatType::AVG, StatType::SUM, StatType::COUNT},
      {0.5, 0.95, 0.99},
      {60, 600}};
  registerAndVerifyQuantile("test_quantile_stat", config);
}

TEST_F(StatsReporterTest, quantileRegistrationWithValues) {
  // Test registration and value addition (covers all key types via templates)
  const char* charKey = "test_quantile_char";
  std::string stringKey = "test_quantile_string";
  folly::StringPiece spKey("test_quantile_sp");

  QuantileConfig config1 = {
      {StatType::AVG, StatType::COUNT}, {0.5, 0.95}, {60, 600}};
  QuantileConfig config2 = {
      {StatType::SUM, StatType::COUNT}, {0.75, 0.99}, {300}};
  QuantileConfig config3 = {{StatType::RATE}, {0.75, 0.90}, {3600}};

  registerAndVerifyQuantile(charKey, config1);
  registerAndVerifyQuantile(stringKey, config2);
  registerAndVerifyQuantile(spKey, config3);

  // Test value addition
  reporter_->addQuantileMetricValue(charKey, 100);
  reporter_->addQuantileMetricValue(charKey, 50);
  EXPECT_EQ(150, reporter_->counterMap[std::string(charKey)]);

  reporter_->addQuantileMetricValue(stringKey, 200);
  reporter_->addQuantileMetricValue(stringKey, 300);
  EXPECT_EQ(500, reporter_->counterMap[stringKey]);

  reporter_->addQuantileMetricValue(spKey, 25);
  reporter_->addQuantileMetricValue(spKey, 75);
  EXPECT_EQ(100, reporter_->counterMap[std::string(spKey)]);
}

TEST_F(StatsReporterTest, multipleQuantileStats) {
  QuantileConfig config1 = {{StatType::AVG}, {0.5}, {60}};
  QuantileConfig config2 = {
      {StatType::COUNT, StatType::SUM}, {0.95, 0.99, 0.999}, {300, 900}};

  registerAndVerifyQuantile("metric1", config1);
  registerAndVerifyQuantile("metric2", config2);
}

TEST_F(StatsReporterTest, emptyVectors) {
  // Test registration with empty vectors (should be allowed)
  std::vector<StatType> emptyStatTypes;
  std::vector<double> emptyPercentiles;
  std::vector<size_t> emptySlidingWindows;

  EXPECT_NO_THROW(reporter_->registerQuantileMetricExportType(
      "empty_metric", emptyStatTypes, emptyPercentiles, emptySlidingWindows));

  EXPECT_TRUE(reporter_->quantileStatTypesMap.count("empty_metric"));
  EXPECT_TRUE(reporter_->quantileStatTypesMap["empty_metric"].empty());
  EXPECT_TRUE(reporter_->quantilePercentilesMap["empty_metric"].empty());
  EXPECT_TRUE(reporter_->quantileSlidingWindowsMap["empty_metric"].empty());
}

TEST_F(StatsReporterTest, quantileStatMacros) {
  DEFINE_QUANTILE_STAT(
      "macro_test_stat",
      statTypes(StatType::AVG, StatType::COUNT),
      percentiles(0.5, 0.95, 0.99),
      slidingWindowsSeconds(60, 600));

  QuantileConfig expectedConfig = {
      {StatType::AVG, StatType::COUNT}, {0.5, 0.95, 0.99}, {60, 600}};
  verifyQuantileRegistration("macro_test_stat", expectedConfig);

  RECORD_QUANTILE_STAT_VALUE("macro_test_stat", 100);
  RECORD_QUANTILE_STAT_VALUE("macro_test_stat", 50);
  RECORD_QUANTILE_STAT_VALUE("macro_test_stat"); // Default value of 1

  EXPECT_EQ(151, reporter_->counterMap["macro_test_stat"]);
}

TEST_F(StatsReporterTest, dynamicQuantileRegistrationWithValues) {
  // Test registration and value addition for different key types
  const char* charPattern = "test_metric_char.{}.{}";
  std::string stringPattern = "test_metric_string.{}.{}";
  folly::StringPiece spPattern("test_metric_sp.{}.{}");

  QuantileConfig config1 = {{StatType::AVG}, {0.95}, {300}};
  QuantileConfig config2 = {{StatType::COUNT}, {0.5}, {60}};
  QuantileConfig config3 = {{StatType::SUM}, {0.5, 0.99}, {60, 600}};

  registerAndVerifyDynamicQuantile(charPattern, config1);
  registerAndVerifyDynamicQuantile(stringPattern, config2);
  registerAndVerifyDynamicQuantile(spPattern, config3);

  auto subkeys1 = subkeys("db1", "table1");
  auto subkeys2 = subkeys("server1", "endpoint1");
  auto subkeys3 = subkeys("region1", "service1");

  reporter_->addDynamicQuantileMetricValue(charPattern, subkeys1, 50);
  reporter_->addDynamicQuantileMetricValue(charPattern, subkeys1, 150);
  EXPECT_EQ(200, reporter_->counterMap["test_metric_char.db1.table1"]);

  reporter_->addDynamicQuantileMetricValue(stringPattern, subkeys2, 100);
  reporter_->addDynamicQuantileMetricValue(stringPattern, subkeys2, 200);
  EXPECT_EQ(300, reporter_->counterMap["test_metric_string.server1.endpoint1"]);

  reporter_->addDynamicQuantileMetricValue(spPattern, subkeys3, 75);
  reporter_->addDynamicQuantileMetricValue(spPattern, subkeys3, 125);
  EXPECT_EQ(200, reporter_->counterMap["test_metric_sp.region1.service1"]);
}

TEST_F(StatsReporterTest, dynamicQuantileRegistrationOnly) {
  // Test registration without values (registration-only test)
  QuantileConfig config = {
      {StatType::AVG, StatType::COUNT}, {0.5, 0.95}, {60, 600}};
  const char* pattern = "registration_test.{}.{}";

  registerAndVerifyDynamicQuantile(pattern, config);

  // Verify registration worked but no values recorded yet
  EXPECT_EQ(0, reporter_->counterMap.count("registration_test.key1.key2"));
}

TEST_F(StatsReporterTest, dynamicQuantileMetricUnregisteredPattern) {
  // Try to add a value for a dynamic quantile metric that was never registered
  // This should silently ignore the value (not crash)
  auto subkeyValues = subkeys("unregistered", "pattern");
  EXPECT_NO_THROW(reporter_->addDynamicQuantileMetricValue(
      "unregistered_pattern.{}.{}", subkeyValues, 42));

  // Verify no value was recorded
  EXPECT_EQ(
      0,
      reporter_->counterMap.count("unregistered_pattern.unregistered.pattern"));
}

struct DynamicQuantilePatternTestCase {
  std::string testName;
  std::function<void(StatsReporterTest*)> testFunc;
};

class DynamicQuantilePatternTest
    : public StatsReporterTest,
      public testing::WithParamInterface<DynamicQuantilePatternTestCase> {};

TEST_P(DynamicQuantilePatternTest, PatternScenarios) {
  const auto& testCase = GetParam();
  testCase.testFunc(this);
}

INSTANTIATE_TEST_SUITE_P(
    PatternScenarios,
    DynamicQuantilePatternTest,
    testing::Values(
        DynamicQuantilePatternTestCase{
            "DefaultValue",
            [](StatsReporterTest* test) {
              test->reporter_->registerDynamicQuantileMetricExportType(
                  "default_value_metric.{}",
                  statTypes(StatType::COUNT),
                  percentiles(0.5),
                  slidingWindowsSeconds(60));

              auto subkeyValues = subkeys("test");
              test->reporter_->addDynamicQuantileMetricValue(
                  "default_value_metric.{}", subkeyValues, 1);
              test->reporter_->addDynamicQuantileMetricValue(
                  "default_value_metric.{}", subkeyValues, 1);
              test->reporter_->addDynamicQuantileMetricValue(
                  "default_value_metric.{}", subkeyValues, 1);

              EXPECT_EQ(
                  3, test->reporter_->counterMap["default_value_metric.test"]);
            }},
        DynamicQuantilePatternTestCase{
            "MultiplePatterns",
            [](StatsReporterTest* test) {
              test->reporter_->registerDynamicQuantileMetricExportType(
                  "pattern1.{}.{}",
                  statTypes(StatType::AVG),
                  percentiles(0.5),
                  slidingWindowsSeconds(60));

              test->reporter_->registerDynamicQuantileMetricExportType(
                  "pattern2.{}.{}.{}",
                  statTypes(StatType::COUNT, StatType::SUM),
                  percentiles(0.95, 0.99, 0.999),
                  slidingWindowsSeconds(300, 900));

              auto subkeys2 = subkeys("key1", "key2");
              auto subkeys3 = subkeys("key1", "key2", "key3");

              test->reporter_->addDynamicQuantileMetricValue(
                  "pattern1.{}.{}", subkeys2, 100);
              test->reporter_->addDynamicQuantileMetricValue(
                  "pattern2.{}.{}.{}", subkeys3, 200);

              EXPECT_EQ(100, test->reporter_->counterMap["pattern1.key1.key2"]);
              EXPECT_EQ(
                  200, test->reporter_->counterMap["pattern2.key1.key2.key3"]);
            }},
        DynamicQuantilePatternTestCase{
            "ComplexPattern",
            [](StatsReporterTest* test) {
              test->reporter_->registerDynamicQuantileMetricExportType(
                  "complex.{}.latency.{}.p95",
                  statTypes(StatType::AVG, StatType::COUNT),
                  percentiles(0.95, 0.99),
                  slidingWindowsSeconds(60, 300, 3600));

              auto subkeyValues = subkeys("http_server", "endpoint_api");
              test->reporter_->addDynamicQuantileMetricValue(
                  "complex.{}.latency.{}.p95", subkeyValues, 150);
              test->reporter_->addDynamicQuantileMetricValue(
                  "complex.{}.latency.{}.p95", subkeyValues, 200);

              EXPECT_EQ(
                  350,
                  test->reporter_->counterMap
                      ["complex.http_server.latency.endpoint_api.p95"]);
            }},
        DynamicQuantilePatternTestCase{
            "SingleSubkey",
            [](StatsReporterTest* test) {
              test->reporter_->registerDynamicQuantileMetricExportType(
                  "single_sub.{}",
                  statTypes(StatType::RATE),
                  percentiles(0.75),
                  slidingWindowsSeconds(600));

              auto subkeyValues = subkeys("single_value");
              test->reporter_->addDynamicQuantileMetricValue(
                  "single_sub.{}", subkeyValues, 300);

              EXPECT_EQ(
                  300, test->reporter_->counterMap["single_sub.single_value"]);
            }},
        DynamicQuantilePatternTestCase{
            "MultipleInstances",
            [](StatsReporterTest* test) {
              test->reporter_->registerDynamicQuantileMetricExportType(
                  "instance_test.{}.{}",
                  statTypes(StatType::SUM),
                  percentiles(0.5),
                  slidingWindowsSeconds(60));

              auto subkeys1 = subkeys("instance1", "metric1");
              auto subkeys2 = subkeys("instance2", "metric2");
              auto subkeys3 = subkeys("instance1", "metric2");

              test->reporter_->addDynamicQuantileMetricValue(
                  "instance_test.{}.{}", subkeys1, 100);
              test->reporter_->addDynamicQuantileMetricValue(
                  "instance_test.{}.{}", subkeys2, 200);
              test->reporter_->addDynamicQuantileMetricValue(
                  "instance_test.{}.{}", subkeys3, 300);
              test->reporter_->addDynamicQuantileMetricValue(
                  "instance_test.{}.{}", subkeys1, 50);

              EXPECT_EQ(
                  150,
                  test->reporter_
                      ->counterMap["instance_test.instance1.metric1"]);
              EXPECT_EQ(
                  200,
                  test->reporter_
                      ->counterMap["instance_test.instance2.metric2"]);
              EXPECT_EQ(
                  300,
                  test->reporter_
                      ->counterMap["instance_test.instance1.metric2"]);
            }}));

TEST_F(StatsReporterTest, dynamicQuantileMetricEmptyVectors) {
  // Test registration with empty vectors (should be allowed)
  std::vector<StatType> emptyStatTypes;
  std::vector<double> emptyPercentiles;
  std::vector<size_t> emptySlidingWindows;

  EXPECT_NO_THROW(reporter_->registerDynamicQuantileMetricExportType(
      "empty_metric.{}",
      emptyStatTypes,
      emptyPercentiles,
      emptySlidingWindows));

  EXPECT_TRUE(reporter_->dynamicQuantileStatTypesMap.count("empty_metric.{}"));
  EXPECT_TRUE(
      reporter_->dynamicQuantileStatTypesMap["empty_metric.{}"].empty());
  EXPECT_TRUE(
      reporter_->dynamicQuantilePercentilesMap["empty_metric.{}"].empty());
  EXPECT_TRUE(
      reporter_->dynamicQuantileSlidingWindowsMap["empty_metric.{}"].empty());

  // Try to use the empty pattern
  auto subkeyValues = subkeys("test");
  EXPECT_NO_THROW(reporter_->addDynamicQuantileMetricValue(
      "empty_metric.{}", subkeyValues, 100));
}

TEST_F(StatsReporterTest, dynamicQuantileStatMacros) {
  DEFINE_DYNAMIC_QUANTILE_STAT(
      "macro_test.{}.{}",
      statTypes(StatType::AVG, StatType::COUNT),
      percentiles(0.5, 0.95, 0.99),
      slidingWindowsSeconds(60, 600));

  QuantileConfig expectedConfig = {
      {StatType::AVG, StatType::COUNT}, {0.5, 0.95, 0.99}, {60, 600}};
  verifyDynamicQuantileRegistration("macro_test.{}.{}", expectedConfig);

  RECORD_DYNAMIC_QUANTILE_STAT_VALUE(
      "macro_test.{}.{}", subkeys("service", "method"), 100);
  RECORD_DYNAMIC_QUANTILE_STAT_VALUE(
      "macro_test.{}.{}", subkeys("service", "method"), 50);
  RECORD_DYNAMIC_QUANTILE_STAT_VALUE(
      "macro_test.{}.{}", subkeys("service", "method")); // Default value of 1

  EXPECT_EQ(151, reporter_->counterMap["macro_test.service.method"]);
}

// Registering to folly Singleton with intended reporter type
folly::Singleton<BaseStatsReporter> reporter([]() {
  return new TestReporter();
});

} // namespace facebook::velox::test

int main(int argc, char** argv) {
  testing::InitGoogleTest(&argc, argv);
  folly::Init init{&argc, &argv, false};
  facebook::velox::BaseStatsReporter::registered = true;
  return RUN_ALL_TESTS();
}
